Skip to content

(2.14) Asynchronous stream snapshots#7876

Merged
neilalexander merged 1 commit intomainfrom
async-stream-snapshots
Apr 10, 2026
Merged

(2.14) Asynchronous stream snapshots#7876
neilalexander merged 1 commit intomainfrom
async-stream-snapshots

Conversation

@sciascid
Copy link
Copy Markdown
Contributor

Commit 0033a15 extended Raft's interface with the ability to create snapshot checkpoints which can be used to install snapshots asynchronously.
This commit reuses the same interface to make stream snapshots asynchronous: writing of the snapshot file and Raft's log compaction steps are done in a separate goroutine.

Signed-off-by: Daniele Sciascia daniele@nats.io

@sciascid
Copy link
Copy Markdown
Contributor Author

This pull request tries to reduce the tail latency of publishing to a stream.
Below are results of publishing to a single stream, on a 3 node cluster deployed on a single machine. The table shows throughput in msg/s and various latency measures (min, 50th percentile, 90th percentile and so on) in microseconds. In each experiment the client runs for 60 seconds.

Without the optimization in this pull request we have the following results:

batch_size throughput min p50 p90 p99 p99.9 max
1 10,243 61 94 108 130 293 28,937
500 282,402 967 1492 2128 12,355 20,522 45,815

Under low load, single client publishing one message at a time, latency varies between 61 microseconds, up to ~29 milliseconds for unlucky messages. Under high load, single client publishing 500 messages at a time, latency goes from ~1ms all the way to ~46ms.

One cause for the high tail latency is due to periodic snapshotting of the stream. Taking a stream snapshot involves writing a file and sync it to disk, and doing it in a safe way requires more than one call to fsync (710407c).
During this time, the monitorStream goroutine is blocked and can't process incoming messages. The above patch "moves" the writing and syncing to disk, off the stream's main thread.

With the changes in this pull request, tail latency improves significantly:

batch_size throughput min p50 p90 p99 p99.9 max
1 10,411 61 94 107 132 253 7,377
500 309,047 1052 1484 2091 3,707 5,419 14,415

I used the "lats" benchmark client: https://github.com/synadia-labs/lats

@sciascid sciascid force-pushed the async-stream-snapshots branch from 6e05f0f to e32926e Compare February 25, 2026 11:52
@sciascid sciascid marked this pull request as ready for review February 25, 2026 12:41
@sciascid sciascid requested a review from a team as a code owner February 25, 2026 12:41
@sciascid sciascid force-pushed the async-stream-snapshots branch from e32926e to 2f79166 Compare February 25, 2026 14:37
Copy link
Copy Markdown
Member

@MauriceVanVeen MauriceVanVeen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@neilalexander
Copy link
Copy Markdown
Member

@claude and @codex please review!

@synadia-claude-reviewer
Copy link
Copy Markdown

synadia-claude-reviewer bot commented Mar 10, 2026

Claude finished @neilalexander's task in 4m 0s —— View job


Code Review: Asynchronous Stream Snapshots

  • Read the PR diff and changed files
  • Analyze the async implementation for correctness
  • Check for race conditions and error handling
  • Post review feedback

Summary

This PR moves the file write and Raft log compaction of stream snapshots off the monitorStream goroutine and into a separate background goroutine, using the existing RaftNodeCheckpoint interface introduced for meta-layer async snapshots. The approach is sound, the benchmarking shows meaningful tail-latency improvement, and the overall design mirrors the already-reviewed meta-layer async snapshot code.

Observations

Concurrency and mutex usage — looks correct

snapMu is used consistently to protect snapshotting, fallbackSnapshot, and failedSnapshots. The handleInstallResult closure is always called under snapMu, whether from the main goroutine (fallback path) or the async goroutine. The locally captured curState in doSnapshot is safe because only one snapshot can run at a time (guarded by snapshotting).

Behavior change: errCatchupsRunning at CreateSnapshotCheckpoint time

In the old code, errCatchupsRunning returned from n.InstallSnapshot was silently ignored — no warning, no failedSnapshots increment:

// old
} else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning {
    s.RateLimitWarnf(...)
    failedSnapshots++
}

In the new code, errCatchupsRunning can come from CreateSnapshotCheckpoint, and the guard only excludes errNoSnapAvailable and errNodeClosed:

// new (server/jetstream_cluster.go ~L3020)
if err != errNoSnapAvailable && err != errNodeClosed {
    s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v", ...)
    failedSnapshots++
}

This means that while a catchup is in progress, each snapshot tick will log a warning and increment failedSnapshots — eventually causing forceSnapshot = failedSnapshots > 4 to kick in and override the catchup check. The meta-layer snapshot code has the same pattern (line ~1554), so this appears to be an intentional alignment rather than a bug, but it is a change in observable behaviour worth confirming. Fix this →

Shutdown with an in-flight async snapshot

On <-s.quitCh / <-mqch, the code sets fallbackSnapshot = true and then calls doSnapshot(false). If an async snapshot is already running, doSnapshot returns immediately (because snapshotting == true). The async goroutine is registered with s.grWG via s.startGoRoutine, so the server will wait for it to complete. This is correct and safe.

Redundant c.Abort() in handleInstallResult

When c.InstallSnapshot(snap) returns an error, handleInstallResult calls c.Abort() to reset n.snapshotting. However, checkpoint.InstallSnapshot already resets n.snapshotting unconditionally via defer func() { n.snapshotting = false }() (raft.go:1488). The c.Abort() call is a no-op but harmless. Worth a comment clarifying this is intentional defensive cleanup, or removing it.

Double lock/unlock in the timer case (minor)

// server/jetstream_cluster.go ~L3366
case <-t.C:
    snapMu.Lock()
    forceIfFailed := failedSnapshots > 0
    snapMu.Unlock()
    doSnapshot(forceIfFailed)  // acquires snapMu again

forceIfFailed is read under one critical section and then used in a second. The value could be stale, but doSnapshot re-checks the same condition internally under snapMu, so there's no correctness issue — just a slightly redundant lock cycle. Not worth changing.

startGoRoutine failure handling

If the server is already shutting down and startGoRoutine returns false, snapshotting is reset to false and c.Abort() is called. This correctly keeps the monitorStream and Raft states in sync. The subsequent snapshot attempt on shutdown (doSnapshot(false) from the quitCh/mqch cases) will see snapshotting == false and run the blocking fallback. Well handled.

Verdict

The implementation is correct and safe. The only item worth a second look is the errCatchupsRunning handling change — either it's intentional alignment with the meta-layer behaviour, or the condition should also exclude errCatchupsRunning to preserve the pre-existing silent-ignore semantics. Everything else looks good, and the latency numbers are compelling.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 2f79166212

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +3020 to +3021
if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Ignore catchup-in-progress on snapshot checkpoint creation

When CreateSnapshotCheckpoint fails because a peer catchup is active (errCatchupsRunning from raft.createSnapshotCheckpointLocked), this branch treats it as a hard failure, increments failedSnapshots, and shortens the compaction ticker. On busy clusters this turns normal catchup behavior into repeated failure accounting and eventually forced snapshots, which is a behavioral regression from the previous stream path that explicitly ignored errCatchupsRunning and can trigger unnecessary blocking snapshots/log churn.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would previously happen on the n.InstallSnapshot() call. This PR "splits" that call into n.CreateSnapshotCheckpoint() followed by InstallSnapshot on that checkpoint

@neilalexander neilalexander changed the title Asynchronous stream snapshots (2.14) Asynchronous stream snapshots Mar 10, 2026
@wallyqs
Copy link
Copy Markdown
Member

wallyqs commented Mar 10, 2026

needs rebase?

@sciascid sciascid force-pushed the async-stream-snapshots branch from 2f79166 to 5885726 Compare March 11, 2026 09:40
@sciascid
Copy link
Copy Markdown
Contributor Author

needs rebase?

done

@sciascid sciascid force-pushed the async-stream-snapshots branch from 5885726 to e800634 Compare March 11, 2026 10:47
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: e8006345c5

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

// to force it through. We might have been catching up a peer for a long period, and this
// protects our log size from growing indefinitely.
forceSnapshot := failedSnapshots > 4
c, err := n.CreateSnapshotCheckpoint(forceSnapshot)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Re-check catchup state before installing checkpoint

CreateSnapshotCheckpoint is invoked before flushAllPending() and stateSnapshot(), but the Raft catchup guard (len(n.progress) == 0) is only enforced at checkpoint creation time, not in checkpoint.InstallSnapshot. If a follower catchup starts in that window, this path will still install/compact a non-forced snapshot, which bypasses the previous safety behavior and can interrupt active catchups with avoidable retry/snapshot churn.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be fixed easily, but the fix would likely belong to a separate PR (the fix would be in raft code, and it would affect meta snapshots as well).
Having said that, I think that this is not such a big issue. In unlucky situations InstallSnapshot may interrupt a catchup that has started after the call the CreateSnapshotCheckpoint. In that case, there wouldn't be a lot of work wasted in the first place.

Commit 0033a15 extended Raft's
interface with the ability to create snapshot checkpoints which
can be used to install snapshots asynchronously.
This commit reuses the same interface to make stream snapshots
asynchronous: writing of the snapshot file and Raft's log
compaction steps are done in a separate goroutine.

Signed-off-by: Daniele Sciascia <daniele@nats.io>
@sciascid sciascid force-pushed the async-stream-snapshots branch from e800634 to 2a42738 Compare April 8, 2026 09:45
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 2a42738886

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

// If this is the first failure, reduce the interval of the snapshot timer.
// This ensures we're not waiting too long for snapshotting to eventually become forced.
if failedSnapshots == 0 {
t.Reset(compactMinInterval)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Guard ticker reset after monitor shutdown

handleInstallResult runs from the async snapshot goroutine, but monitorStream can exit on mqch/qch and run defer t.Stop() before that goroutine completes. If that late callback reaches this t.Reset(...) path (for example when the in-flight snapshot fails because the node/store is being stopped), it re-arms a ticker that no longer has a consumer, leaving an orphaned periodic timer after the monitor is gone. This can accumulate background wakeups during stream moves/scale-downs when snapshots are in flight.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is a real concern. Sure, if t is Reset after Stop, it may keep ticking unnecessarily. It also means that is it no longer referenced, so it will eventually stop when collected. Not sure it is worth introducing complexity for a ticker that will tick every 15 seconds, in the worst case.

@sciascid
Copy link
Copy Markdown
Contributor Author

sciascid commented Apr 8, 2026

@claude review again

@synadia-claude-reviewer
Copy link
Copy Markdown

synadia-claude-reviewer bot commented Apr 8, 2026

Claude finished @sciascid's task in 3m 53s —— View job


Code Review: Asynchronous Stream Snapshots (re-review)

  • Read PR diff and changed files
  • Analyze async implementation for correctness
  • Check open issues from previous reviews
  • Post review feedback

Summary

The implementation is sound and the design is clean. The async goroutine is correctly bounded by s.grWG, shutdown paths set fallbackSnapshot = true before calling doSnapshot, and mutex usage is consistent throughout. The latency numbers are compelling. I've re-examined all open items from prior reviews and the two remaining Codex comments.


Open Items Revisited

1. errCatchupsRunning at CreateSnapshotCheckpoint time (server/jetstream_cluster.go:3015)

This was flagged in both the previous Claude review and by Codex. The old n.InstallSnapshot path explicitly excluded errCatchupsRunning from failedSnapshots:

// old
} else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning {
    failedSnapshots++
}

The new code does not exclude it at checkpoint creation, so each tick during a catchup will log a warning and increment failedSnapshots. This will eventually force forceSnapshot = failedSnapshots > 4, overriding the catchup guard.

sciascid's response ("This would previously happen on the n.InstallSnapshot() call") is not quite right — the old code did explicitly silence errCatchupsRunning at that path. This is a real behaviour change. However, inspecting the meta-layer counterpart at server/jetstream_cluster.go:1563 confirms it also does not exclude errCatchupsRunning from the checkpoint error path, so this is deliberate alignment with the meta layer rather than an oversight. The forced-snapshot safety valve is intentional design.

If the old silent-ignore behaviour for streams is preferred, the fix is one line: Fix this →
Otherwise, a comment clarifying the intentional change would help future readers.

2. Catchup guard only at checkpoint creation, not at install time (server/jetstream_cluster.go:3013)

Codex flagged that a catchup could start in the window between CreateSnapshotCheckpoint and c.InstallSnapshot. This is a real theoretical window. sciascid's response that the fix belongs in raft code and would affect meta snapshots too is correct — checkpoint.InstallSnapshot doesn't re-check len(n.progress). The practical impact is small (a catchup starting in that narrow window would have very little work in progress), and fixing it properly requires raft-level changes that affect both meta and stream layers. Agreed this should be tracked separately.

3. Ticker t.Reset() from async goroutine after monitorStream exits (server/jetstream_cluster.go:3052, 3069)

handleInstallResult calls t.Reset(...) from the async goroutine. If monitorStream has already exited and run defer t.Stop(), this re-arms a stopped ticker. sciascid's response is correct: t is captured by the closure, which is held by the goroutine. After the goroutine exits, there are no more references and the ticker is GC'd. The time.Ticker type is internally thread-safe, so there's no data race. The re-armed ticker may fire once with no consumer, but it will not accumulate. Not a real concern.

4. Redundant c.Abort() in handleInstallResult error path (server/jetstream_cluster.go:3057)

checkpoint.InstallSnapshot unconditionally resets n.snapshotting = false via defer (raft.go:1489–1491). The subsequent c.Abort() in handleInstallResult is therefore a no-op. Harmless. A brief comment would clarify intent for future readers.

5. Shutdown with in-flight async snapshot

When quitCh/mqch fires while an async snapshot goroutine is running, doSnapshot(false) returns immediately (because snapshotting == true) and monitorStream exits. The async goroutine continues via grWG and will complete (successfully or via errSnapAborted if the raft node closes first). If it fails, there is no second blocking-fallback attempt since the monitor has exited — but the raft log remains durable and the next startup replays from the prior snapshot + log. This is acceptable for a shutdown path.


Verdict

The implementation is correct and safe. The only item worth explicit confirmation before merge is whether the errCatchupsRunning behaviour change at CreateSnapshotCheckpoint time is intentional (aligned with meta layer) or should preserve the old silent-ignore semantics. Everything else is either intentionally aligned with the meta-layer pattern or harmless. The latency improvement justifies the change.

@sciascid
Copy link
Copy Markdown
Contributor Author

sciascid commented Apr 8, 2026

The only item worth explicit confirmation before merge is whether the errCatchupsRunning behaviour change at CreateSnapshotCheckpoint time is intentional

@claude We should eventually force CreateSnapshotCheckpoint if it keeps failing. So yes, this change is intentional.

@synadia-claude-reviewer
Copy link
Copy Markdown

synadia-claude-reviewer bot commented Apr 8, 2026

Claude finished @sciascid's task in 47s —— View job


  • Acknowledge sciascid's clarification on errCatchupsRunning behavior

Clarification Acknowledged

The errCatchupsRunning behavior change at CreateSnapshotCheckpoint is confirmed intentional. The code comment at server/jetstream_cluster.go:3009–3011 already documents the rationale:

// If we had a significant number of failed snapshots, start relaxing Raft-layer checks
// to force it through. We might have been catching up a peer for a long period, and this
// protects our log size from growing indefinitely.
forceSnapshot := failedSnapshots > 4

The design is: each errCatchupsRunning tick increments failedSnapshots and shortens the retry interval, so that after 5 consecutive failures the next attempt sets forceSnapshot = true and pushes the snapshot through regardless. This is the intended safety valve to bound log growth during extended catchups — a deliberate improvement over the old silent-ignore behaviour.

All open items from prior reviews are now resolved:

Item Status
errCatchupsRunning at CreateSnapshotCheckpoint — intentional forced-snapshot safety valve ✅ Confirmed intentional
Catchup guard window between CreateSnapshotCheckpoint and InstallSnapshot ✅ Acknowledged — tracked separately in raft layer
Ticker t.Reset() after monitorStream exits ✅ Non-issue — GC'd once unreferenced, time.Ticker is thread-safe
Redundant c.Abort() in handleInstallResult error path ✅ Harmless no-op, existing code handles it
Shutdown with in-flight async snapshot ✅ Correctly bounded by grWG

The implementation is correct and safe. Ready to merge.

Copy link
Copy Markdown
Member

@MauriceVanVeen MauriceVanVeen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Copy Markdown
Member

@neilalexander neilalexander left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@neilalexander neilalexander merged commit 27b63e3 into main Apr 10, 2026
73 of 75 checks passed
@neilalexander neilalexander deleted the async-stream-snapshots branch April 10, 2026 11:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants